use std::cmp::Ordering;
use std::fmt::Display;
use std::fmt::Formatter;
use std::fmt::Write;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::LazyLock;

use apollo_compiler::Name;
use apollo_compiler::Node;
use apollo_compiler::ast::Type;
use apollo_compiler::ast::Value;
use apollo_compiler::collections::IndexMap;
use apollo_compiler::collections::IndexSet;
use either::Either;
use itertools::Itertools;
use itertools::izip;
use operation::OpGraphPathContext;
use operation::OpPathElement;
use petgraph::graph::EdgeIndex;
use petgraph::graph::EdgeReference;
use petgraph::graph::NodeIndex;
use petgraph::visit::EdgeRef;
use tracing::debug;
use tracing::debug_span;

use super::condition_resolver::ContextMapEntry;
use crate::bail;
use crate::error::FederationError;
use crate::link::federation_spec_definition::get_federation_spec_definition_from_subgraph;
use crate::link::graphql_definition::DeferDirectiveArguments;
use crate::operation::Field;
use crate::operation::FieldSetDisplay;
use crate::operation::Selection;
use crate::operation::SelectionSet;
use crate::query_graph::OverrideConditions;
use crate::query_graph::QueryGraph;
use crate::query_graph::QueryGraphEdge;
use crate::query_graph::QueryGraphEdgeTransition;
use crate::query_graph::QueryGraphNode;
use crate::query_graph::QueryGraphNodeType;
use crate::query_graph::condition_resolver::ConditionResolution;
use crate::query_graph::condition_resolver::ConditionResolver;
use crate::query_graph::condition_resolver::UnsatisfiedConditionReason;
use crate::query_graph::path_tree::OpPathTree;
use crate::query_plan::FetchDataPathElement;
use crate::query_plan::QueryPlanCost;
use crate::schema::ValidFederationSchema;
use crate::schema::field_set::parse_field_value_without_validation;
use crate::schema::field_set::validate_field_value;
use crate::schema::position::CompositeTypeDefinitionPosition;
use crate::schema::position::ObjectTypeDefinitionPosition;
use crate::schema::position::OutputTypeDefinitionPosition;
use crate::schema::position::SchemaRootDefinitionKind;
use crate::utils::FallibleIterator;
use crate::utils::logging::snapshot;

pub(crate) mod operation;
pub(crate) mod transition;

#[derive(Clone, serde::Serialize, Debug, Eq, PartialEq)]
pub(crate) struct ContextUsageEntry {
    pub(crate) context_id: Name,
    pub(crate) relative_path: Vec<FetchDataPathElement>,
    pub(crate) selection_set: SelectionSet,
    pub(crate) subgraph_argument_type: Node<Type>,
}

/// An immutable path in a query graph.
///
/// A "path" here is mostly understood in the graph-theoretical sense of the term, i.e. as "a
/// connected series of edges"; a `GraphPath` is generated by traversing a query graph.
///
/// However, as query graph edges may have conditions, a `GraphPath` also records, for each edge it
/// is composed of, the set of paths (an `OpPathTree` in practice) that were taken to fulfill each
/// edge's conditions (when an edge has one).
///
/// Additionally, for each edge of the path, a `GraphPath` records the "trigger" that made the
/// traversal take that edge. In practice, the "trigger" can be seen as a way to decorate a path
/// with some additional metadata for each element of the path. In practice, that trigger is used in
/// 2 main ways (corresponding to our 2 main query graph traversals):
/// - For composition validation, the traversal of the federated query graph is driven by other
///   transitions into the supergraph API query graph (essentially, composition validation is about
///   finding, for every path in supergraph API query graph, a "matching" traversal of the federated
///   query graph). In that case, for the graph paths we build on the federated query graph, the
///   "trigger" will be one of the edge transitions from the supergraph API query graph (which,
///   granted, will be fairly similar to the one of the edge we're taking in the federated query
///   graph; in practice, triggers are more useful in the query planning case).
/// - For query planning, the traversal of the federated query graph is driven by the elements of
///   the query we are planning. Which means that the "trigger" for taking an edge in this case will
///   be an operation element (or `None`). See the specialized `OpGraphPath` that is defined for this
///   use case.
///
/// Lastly, some `GraphPath`s can actually encode `None` edges: this is used during query planning
/// in the (rare) case where the query we plan for has an inline fragment spread without type
/// condition (or a "useless" one, i.e. one that doesn't restrict the possible types anymore than
/// they already were) but with some directives. In that case, we want to preserve the information
/// about the directive (to properly rebuild query plans later) but it doesn't correspond to taking
/// any edges, so we add a `None` edge and use the trigger to store the fragment spread.
///
/// Regarding type parameters:
/// - `TTrigger`: The type of the path's "triggers", metadata that can associated to each element
///   of the path (see above for more details).
/// - `TEdge`: The type of the edge. Either `Option<EdgeIndex>` (meaning that the path may have a
///   `None` edge) or `never` (the path cannot have `None` edges).
// PORT_NOTE: The JS codebase also parameterized whether the head of the path was a root node, but
// in the Rust code we don't have a distinguished type for that case. We instead check this at
// runtime (at the callsites that require root nodes). This means the `RootPath` type in the
// JS codebase is replaced with this one.
#[derive(Clone, serde::Serialize)]
pub(crate) struct GraphPath<TTrigger, TEdge>
where
    TTrigger: Eq + Hash + GraphPathTriggerVariant,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
    /// The query graph of which this is a path.
    #[serde(skip)]
    graph: Arc<QueryGraph>,
    /// The node at which the path starts. This should be the head of the first non-`None` edge in
    /// the path if such edge exists, but if there are only `None` edges (or if there are zero
    /// edges), this will still exist (and the head and tail of the path will be the same).
    head: NodeIndex,
    /// The node at which the path stops. This should be the tail of the last non-`None` edge in the
    /// path if such edge exists, but if there are only `None` edges (or if there are zero edges),
    /// this will still exist (and the head and tail of the path will be the same).
    tail: NodeIndex,
    /// The edges composing the path.
    edges: Vec<TEdge>,
    /// The triggers associated to each edge in the path.
    edge_triggers: Vec<Arc<TTrigger>>,
    /// For each edge in the path, if the edge has conditions, the set of paths that fulfill that
    /// condition.
    ///
    /// Note that no matter which kind of traversal we are doing (composition or query planning),
    /// fulfilling the conditions is always driven by the conditions themselves, and since
    /// conditions are a GraphQL result set, the resulting set of paths are an `OpGraphPath` (and
    /// since they start at the edge's head node, we use the `OpPathTree` representation for that
    /// set of paths).
    edge_conditions: Vec<Option<Arc<OpPathTree>>>,
    /// Information about the last subgraph-entering edge in this path, which is used to eliminate
    /// some non-optimal paths. (This is reset when encountering a `@defer` application.)
    last_subgraph_entering_edge_info: Option<SubgraphEnteringEdgeInfo>,
    /// Names of all the possible runtime types the tail of the path can be.
    runtime_types_of_tail: Arc<IndexSet<ObjectTypeDefinitionPosition>>,
    /// If the last edge in the `edges` array was a `DownCast` transition, then the runtime types
    /// before that edge.
    runtime_types_before_tail_if_last_is_cast: Option<Arc<IndexSet<ObjectTypeDefinitionPosition>>>,
    /// If the trigger of the last edge in the `edges` array was an operation element with a
    /// `@defer` application, then the arguments of that application.
    // TODO(@TylerBloom): Add in once defer is supported.
    #[serde(skip)]
    defer_on_tail: Option<DeferDirectiveArguments>,
    // PORT_NOTE: This field was renamed because the JS name (`contextToSelection`) implied it was
    // a map to selections, which it isn't.
    /// The IDs of contexts that have matched at the edge, for each edge in the path.
    matching_context_ids: Vec<Option<MatchingContextIds>>,
    // PORT_NOTE: This field was renamed because the JS name (`parameterToContext`) left confusion
    // to how a parameter was different from an argument.
    /// Maps of @fromContext arguments to info about the contexts used in those arguments, for each
    /// edge in the path.
    arguments_to_context_usages: Vec<Option<ArgumentsToContextUsages>>,
}

impl<TTrigger, TEdge> std::fmt::Debug for GraphPath<TTrigger, TEdge>
where
    TTrigger: Eq + Hash + GraphPathTriggerVariant,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
    // In addition to the bounds of the GraphPath struct, also require Debug:
    TTrigger: std::fmt::Debug,
    TEdge: std::fmt::Debug,
{
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        let Self {
            graph: _, // skip
            head,
            tail,
            edges,
            edge_triggers,
            edge_conditions,
            last_subgraph_entering_edge_info,
            runtime_types_of_tail,
            runtime_types_before_tail_if_last_is_cast,
            defer_on_tail,
            matching_context_ids: _,
            arguments_to_context_usages: _,
        } = self;

        f.debug_struct("GraphPath")
            .field("head", head)
            .field("tail", tail)
            .field("edges", edges)
            .field("edge_triggers", edge_triggers)
            .field("edge_conditions", edge_conditions)
            .field(
                "last_subgraph_entering_edge_info",
                last_subgraph_entering_edge_info,
            )
            .field("runtime_types_of_tail", runtime_types_of_tail)
            .field(
                "runtime_types_before_tail_if_last_is_cast",
                runtime_types_before_tail_if_last_is_cast,
            )
            .field("defer_on_tail", defer_on_tail)
            .finish_non_exhaustive()
    }
}

#[derive(Debug, Clone, serde::Serialize)]
pub(crate) struct SubgraphEnteringEdgeInfo {
    /// The index within the `edges` array.
    index: usize,
    /// The cost of resolving the conditions for this edge.
    conditions_cost: QueryPlanCost,
}

pub(crate) type MatchingContextIds = IndexSet<Name>;
pub(crate) type ArgumentsToContextUsages = IndexMap<Name, ContextUsageEntry>;

/// The item type for [`GraphPath::iter`]
pub(crate) type GraphPathItem<'path, TTrigger, TEdge> = (
    TEdge,
    &'path Arc<TTrigger>,
    &'path Option<Arc<OpPathTree>>,
    Option<&'path MatchingContextIds>,
    Option<&'path ArgumentsToContextUsages>,
);

/// A "set" of excluded destinations (i.e. subgraph names). Note that we use a `Vec` instead of set
/// because this is used in pretty hot paths (the whole path computation is CPU intensive) and will
/// basically always be tiny (it's bounded by the number of distinct key on a given type, so usually
/// 2-3 max; even in completely unrealistic cases, it's hard bounded by the number of subgraphs), so
/// a `Vec` is going to perform a lot better than `IndexSet` in practice.
#[derive(Debug, Clone, serde::Serialize)]
pub(crate) struct ExcludedDestinations(Arc<Vec<Arc<str>>>);

impl ExcludedDestinations {
    fn is_excluded(&self, destination: &str) -> bool {
        self.0
            .iter()
            .any(|excluded| excluded.as_ref() == destination)
    }

    fn add_excluded(&self, destination: &Arc<str>) -> Self {
        if !self.is_excluded(destination) {
            let mut new = self.0.as_ref().clone();
            new.push(destination.clone());
            Self(Arc::new(new))
        } else {
            self.clone()
        }
    }
}

impl PartialEq for ExcludedDestinations {
    /// See if two `ExcludedDestinations` have the same set of values, regardless of their ordering.
    fn eq(&self, other: &ExcludedDestinations) -> bool {
        self.0.len() == other.0.len() && self.0.iter().all(|x| other.0.contains(x))
    }
}

impl Default for ExcludedDestinations {
    fn default() -> Self {
        ExcludedDestinations(Arc::new(vec![]))
    }
}

#[derive(Debug, Clone, serde::Serialize)]
pub(crate) struct ExcludedConditions(Arc<Vec<Arc<SelectionSet>>>);

impl ExcludedConditions {
    pub(crate) fn is_empty(&self) -> bool {
        self.0.is_empty()
    }

    fn is_excluded(&self, condition: Option<&Arc<SelectionSet>>) -> bool {
        let Some(condition) = condition else {
            return false;
        };
        self.0.contains(condition)
    }

    /// Immutable version of `push`.
    pub(crate) fn add_item(&self, value: &SelectionSet) -> ExcludedConditions {
        let mut result = self.0.as_ref().clone();
        result.push(value.clone().into());
        ExcludedConditions(Arc::new(result))
    }
}

impl Default for ExcludedConditions {
    fn default() -> Self {
        ExcludedConditions(Arc::new(vec![]))
    }
}

#[derive(serde::Serialize)]
pub(crate) struct IndirectPaths<TTrigger, TEdge, TDeadEnds>
where
    TTrigger: Eq + Hash + GraphPathTriggerVariant,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
    UnadvanceableClosures: Into<TDeadEnds>,
{
    paths: Arc<Vec<Arc<GraphPath<TTrigger, TEdge>>>>,
    #[serde(skip_serializing)]
    dead_ends: TDeadEnds,
}

#[derive(Clone)]
pub(crate) struct UnadvanceableClosures(Arc<Vec<UnadvanceableClosure>>);

impl From<UnadvanceableClosures> for () {
    fn from(_: UnadvanceableClosures) -> Self {}
}

type UnadvanceableClosure = Arc<
    LazyLock<
        Result<Unadvanceables, FederationError>,
        Box<dyn FnOnce() -> Result<Unadvanceables, FederationError> + Send + Sync>,
    >,
>;

#[derive(Debug, Clone, serde::Serialize)]
pub(crate) struct Unadvanceables(Arc<Vec<Arc<Unadvanceable>>>);

impl Display for Unadvanceables {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.write_char('[')?;
        let mut unadvanceables = self.0.iter();
        if let Some(unadvanceable) = unadvanceables.next() {
            unadvanceable.fmt(f)?;
            for unadvanceable in unadvanceables {
                f.write_str(", ")?;
                unadvanceable.fmt(f)?;
            }
        }
        f.write_char(']')
    }
}

impl Unadvanceables {
    #[allow(dead_code)]
    pub(crate) fn is_empty(&self) -> bool {
        self.0.is_empty()
    }

    pub(crate) fn iter(&self) -> impl Iterator<Item = &Arc<Unadvanceable>> {
        self.0.iter()
    }
}

impl TryFrom<UnadvanceableClosure> for Unadvanceables {
    type Error = FederationError;

    fn try_from(value: UnadvanceableClosure) -> Result<Self, Self::Error> {
        (*value).clone()
    }
}

impl TryFrom<UnadvanceableClosures> for Unadvanceables {
    type Error = FederationError;

    fn try_from(value: UnadvanceableClosures) -> Result<Self, Self::Error> {
        Ok(Unadvanceables(Arc::new(
            value
                .0
                .iter()
                .map(|closure| (**closure).clone())
                .process_results(|iter| {
                    // Lending iterators aren't in Rust yet, so using a loop.
                    let mut all_unadvanceables = vec![];
                    for unadvanceables in iter {
                        all_unadvanceables.extend(unadvanceables.0.iter().cloned());
                    }
                    all_unadvanceables
                })?,
        )))
    }
}

#[derive(Debug, Clone, serde::Serialize)]
pub(crate) struct Unadvanceable {
    reason: UnadvanceableReason,
    from_subgraph: Arc<str>,
    to_subgraph: Arc<str>,
    details: String,
}

impl Display for Unadvanceable {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "[{}]({}->{}) {}",
            self.reason, self.from_subgraph, self.to_subgraph, self.details
        )
    }
}

impl Unadvanceable {
    #[allow(dead_code)]
    pub(crate) fn reason(&self) -> &UnadvanceableReason {
        &self.reason
    }

    /// Returns `self.from_subgraph`. It's named `source_subgraph`, since `fn from_subgraph()` may
    /// be ambiguous with the Rust `from_*` convention.
    pub(crate) fn source_subgraph(&self) -> &str {
        &self.from_subgraph
    }

    /// Returns `self.to_subgraph`. It's named `dest_subgraph`, since `fn to_subgraph()` may
    /// be ambiguous with the Rust `to_*` convention.
    #[allow(dead_code)]
    pub(crate) fn dest_subgraph(&self) -> &str {
        &self.to_subgraph
    }

    pub(crate) fn details(&self) -> &str {
        &self.details
    }
}

#[derive(Debug, Clone, strum_macros::Display, serde::Serialize)]
#[allow(dead_code)]
pub(crate) enum UnadvanceableReason {
    UnsatisfiableKeyCondition,
    UnsatisfiableRequiresCondition,
    UnresolvableInterfaceObject,
    NoMatchingTransition,
    UnreachableType,
    IgnoredIndirectPath,
    UnsatisfiableOverrideCondition,
}

// A drop-in replacement for `BinaryHeap`, but behaves more like JS QP's `popMin` method.
struct MaxHeap<T>
where
    T: Ord,
{
    items: Vec<T>,
}

impl<T> MaxHeap<T>
where
    T: Ord,
{
    fn new() -> Self {
        Self { items: Vec::new() }
    }

    fn push(&mut self, item: T) {
        self.items.push(item);
    }

    fn pop(&mut self) -> Option<T> {
        // PORT_NOTE: JS QP returns the max item, but favors the first inserted one if there are
        //            multiple maximum items.
        // Note: `position_max` returns the last of the equally maximum items. Thus, we use
        //       `position_min_by` by reversing the ordering.
        let pos = self.items.iter().position_min_by(|a, b| b.cmp(a))?;
        Some(self.items.remove(pos))
    }
}

/// `GraphPath` is generic over two types, `TTrigger` and `TEdge`. This trait helps abstract over
/// the `TTrigger` type bound. A `TTrigger` is one of the two types that make up the variants of
/// the `GraphPathTrigger`. Rather than trying to cast into concrete types and cast back (and
/// potentially raise errors), this trait provides ways to access the data needed within.
pub(crate) trait GraphPathTriggerVariant: Eq + Hash + std::fmt::Debug {
    fn get_field_parent_type(&self) -> Option<CompositeTypeDefinitionPosition>;
    fn get_field_mut(&mut self) -> Option<&mut Field>;
    fn get_op_path_element(&self) -> Option<&OpPathElement>;
}

impl<TTrigger, TEdge> GraphPath<TTrigger, TEdge>
where
    TTrigger: GraphPathTriggerVariant + Display + Send + Sync + 'static,
    TEdge: Copy + Into<Option<EdgeIndex>> + std::fmt::Debug + Send + Sync + 'static,
    EdgeIndex: Into<TEdge>,
{
    pub(crate) fn graph(&self) -> &Arc<QueryGraph> {
        &self.graph
    }
    pub(crate) fn tail(&self) -> NodeIndex {
        self.tail
    }
    pub(crate) fn runtime_types_of_tail(&self) -> &Arc<IndexSet<ObjectTypeDefinitionPosition>> {
        &self.runtime_types_of_tail
    }

    /// Creates a new (empty) path starting at the provided `head` node.
    pub(crate) fn new(graph: Arc<QueryGraph>, head: NodeIndex) -> Result<Self, FederationError> {
        let mut path = Self {
            graph,
            head,
            tail: head,
            edges: vec![],
            edge_triggers: vec![],
            edge_conditions: vec![],
            last_subgraph_entering_edge_info: None,
            runtime_types_of_tail: Arc::new(IndexSet::default()),
            runtime_types_before_tail_if_last_is_cast: None,
            defer_on_tail: None,
            matching_context_ids: Vec::default(),
            arguments_to_context_usages: Vec::default(),
        };
        path.runtime_types_of_tail = Arc::new(path.head_possible_runtime_types()?);
        Ok(path)
    }

    /// Creates a new (empty) path starting from the root node in `graph` corresponding to the
    /// provided `root_kind`.
    pub(crate) fn from_graph_root(
        graph: Arc<QueryGraph>,
        root_kind: SchemaRootDefinitionKind,
    ) -> Result<Self, FederationError> {
        let Some(root_node) = graph.root_kinds_to_nodes()?.get(&root_kind).copied() else {
            bail!("Unexpectedly no root node for the root kind {}", root_kind);
        };
        GraphPath::new(graph, root_node)
    }

    fn head_possible_runtime_types(
        &self,
    ) -> Result<IndexSet<ObjectTypeDefinitionPosition>, FederationError> {
        let head_weight = self.graph.node_weight(self.head)?;
        Ok(match &head_weight.type_ {
            QueryGraphNodeType::SchemaType(head_type_pos) => {
                let head_type_pos: CompositeTypeDefinitionPosition =
                    head_type_pos.clone().try_into()?;
                self.graph
                    .schema_by_source(&head_weight.source)?
                    .possible_runtime_types(head_type_pos)?
            }
            QueryGraphNodeType::FederatedRootType(_) => IndexSet::default(),
        })
    }

    pub(crate) fn add(
        &self,
        mut trigger: TTrigger,
        edge: TEdge,
        condition_resolution: ConditionResolution,
        defer: Option<DeferDirectiveArguments>,
    ) -> Result<Self, FederationError> {
        let ConditionResolution::Satisfied {
            path_tree: condition_path_tree,
            cost: condition_cost,
            context_map,
        } = condition_resolution
        else {
            return Err(FederationError::internal(
                "Cannot add an edge to a path if its conditions cannot be satisfied",
            ));
        };

        let mut edges = self.edges.clone();
        let mut edge_triggers = self.edge_triggers.clone();
        let mut edge_conditions = self.edge_conditions.clone();
        let mut matching_context_ids = self.matching_context_ids.clone();
        let mut arguments_to_context_usages = self.arguments_to_context_usages.clone();
        let mut last_subgraph_entering_edge_info = if defer.is_none() {
            self.last_subgraph_entering_edge_info.clone()
        } else {
            None
        };

        let Some(new_edge) = edge.into() else {
            edges.push(edge);
            edge_triggers.push(Arc::new(trigger));
            edge_conditions.push(condition_path_tree);
            matching_context_ids.push(None);
            arguments_to_context_usages.push(None);
            return Ok(GraphPath {
                graph: self.graph.clone(),
                head: self.head,
                tail: self.tail,
                edges,
                edge_triggers,
                edge_conditions,
                // We clear `last_subgraph_entering_edge_info` as we enter a `@defer`. That is
                // because `last_subgraph_entering_edge_info` is used to eliminate some non-optimal
                // paths, but we don't want those optimizations to bypass a `@defer` application.
                last_subgraph_entering_edge_info,
                runtime_types_of_tail: Arc::new(
                    self.graph
                        .advance_possible_runtime_types(&self.runtime_types_of_tail, None)?,
                ),
                runtime_types_before_tail_if_last_is_cast: None,
                defer_on_tail: defer,
                matching_context_ids,
                arguments_to_context_usages,
            });
        };

        let (edge_head, edge_tail) = self.graph.edge_endpoints(new_edge)?;
        let edge_weight = self.graph.edge_weight(new_edge)?;
        let tail_weight = self.graph.node_weight(self.tail)?;
        if self.tail != edge_head {
            return Err(FederationError::internal(format!(
                "Cannot add edge {edge_weight} to path ending at {tail_weight}"
            )));
        }
        if let Some(path_tree) = &condition_path_tree
            && edge_weight.conditions.is_none()
        {
            return Err(FederationError::internal(format!(
                "Unexpectedly got conditions paths {path_tree} for edge {edge_weight} without conditions",
            )));
        }

        if matches!(
            edge_weight.transition,
            QueryGraphEdgeTransition::Downcast { .. }
        ) && let Some(Some(last_edge)) = self.edges.last().map(|e| (*e).into())
        {
            let Some(last_edge_trigger) = self.edge_triggers.last() else {
                return Err(FederationError::internal(
                    "Could not find corresponding trigger for edge",
                ));
            };
            if let Some(OpPathElement::InlineFragment(last_operation_element)) =
                last_edge_trigger.get_op_path_element()
                && last_operation_element.directives.is_empty()
            {
                // This mean we have 2 typecasts back-to-back, and that means the
                // previous operation element might not be useful on this path. More
                // precisely, the previous typecast was only useful if it restricted the
                // possible runtime types of the type on which it applied more than the
                // current typecast does (but note that if the previous typecast had
                // directives, we keep it no matter what in case those directives are
                // important).
                //
                // That is, we're in the case where we have (somewhere potentially deep
                // in a query):
                //   f {  # field 'f' of type A
                //     ... on B {
                //       ... on C {
                //          # more stuff
                //       }
                //     }
                //   }
                // If the intersection of A and C is non empty and included (or equal)
                // to the intersection of A and B, then there is no reason to have
                // `... on B` at all because:
                //  1. you can do `... on C` on `f` directly since the intersection of A
                //     and C is non-empty.
                //  2. `... on C` restricts strictly more than `... on B` and so the
                //     latter can't impact the result.
                // So if we detect that we're in that situation, we remove the
                // `... on B` (but note that this is an optimization, keeping `... on B`
                // wouldn't be incorrect, just useless).
                let Some(runtime_types_before_tail) =
                    &self.runtime_types_before_tail_if_last_is_cast
                else {
                    return Err(FederationError::internal(
                        "Could not find runtime types of path prior to inline fragment",
                    ));
                };
                let new_runtime_types_of_tail = self
                    .graph
                    .advance_possible_runtime_types(runtime_types_before_tail, Some(new_edge))?;
                if !new_runtime_types_of_tail.is_empty()
                    && new_runtime_types_of_tail.is_subset(&self.runtime_types_of_tail)
                {
                    debug!(
                        "Previous cast {last_operation_element:?} is made obsolete by new cast {trigger:?}, removing from path."
                    );
                    // Note that `edge` starts at the node we wish to eliminate from the
                    // path. So we need to replace it with the edge going directly from
                    // the previous node to the new tail for this path.
                    //
                    // PORT_NOTE: The JS codebase has a bug where it doesn't check that
                    // the searched edges are downcast edges. We fix that here.
                    let (last_edge_head, _) = self.graph.edge_endpoints(last_edge)?;
                    let edge_tail_weight = self.graph.node_weight(edge_tail)?;
                    let mut new_edge = None;
                    for new_edge_ref in self.graph.out_edges(last_edge_head) {
                        if !matches!(
                            new_edge_ref.weight().transition,
                            QueryGraphEdgeTransition::Downcast { .. }
                        ) {
                            continue;
                        }
                        if self.graph.node_weight(new_edge_ref.target())?.type_
                            == edge_tail_weight.type_
                        {
                            new_edge = Some(new_edge_ref.id());
                            break;
                        }
                    }
                    if let Some(new_edge) = new_edge {
                        // We replace the previous operation element with this one.
                        edges.pop();
                        edge_triggers.pop();
                        edge_conditions.pop();
                        edges.push(new_edge.into());
                        edge_triggers.push(Arc::new(trigger));
                        edge_conditions.push(condition_path_tree);
                        return Ok(GraphPath {
                            graph: self.graph.clone(),
                            head: self.head,
                            tail: edge_tail,
                            edges,
                            edge_triggers,
                            edge_conditions,
                            last_subgraph_entering_edge_info: self
                                .last_subgraph_entering_edge_info
                                .clone(),
                            runtime_types_of_tail: Arc::new(new_runtime_types_of_tail),
                            runtime_types_before_tail_if_last_is_cast: self
                                .runtime_types_before_tail_if_last_is_cast
                                .clone(),
                            // We know the edge is a `DownCast`, so if there is no new
                            // `@defer` taking precedence, we just inherit the prior
                            // version.
                            defer_on_tail: if defer.is_some() {
                                defer
                            } else {
                                self.defer_on_tail.clone()
                            },
                            matching_context_ids: self.matching_context_ids.clone(),
                            arguments_to_context_usages: self.arguments_to_context_usages.clone(),
                        });
                    }
                }
            }
        }

        if matches!(
            edge_weight.transition,
            QueryGraphEdgeTransition::KeyResolution
        ) {
            // We're adding a `@key` edge. If the last edge to that point is an `@interfaceObject`
            // fake downcast, and if our destination type is not an `@interfaceObject` itself, then
            // we can eliminate that last edge as it does nothing useful (also, it has conditions
            // and we don't need/want the `@key` we're following to depend on those conditions,
            // since it doesn't have to).
            let edge_tail_weight = self.graph.node_weight(edge_tail)?;
            if self.last_edge_is_interface_object_fake_down_cast()?
                && matches!(
                    edge_tail_weight.type_,
                    QueryGraphNodeType::SchemaType(OutputTypeDefinitionPosition::Interface(_))
                )
            {
                // We replace the previous operation element with this one.
                edges.pop();
                edge_triggers.pop();
                edge_conditions.pop();
                edges.push(edge);
                edge_triggers.push(Arc::new(trigger));
                edge_conditions.push(condition_path_tree);
                if defer.is_none() && self.graph.is_cross_subgraph_edge(new_edge)? {
                    last_subgraph_entering_edge_info = Some(SubgraphEnteringEdgeInfo {
                        index: self.edges.len() - 1,
                        conditions_cost: condition_cost,
                    });
                }
                return Ok(GraphPath {
                    graph: self.graph.clone(),
                    head: self.head,
                    tail: edge_tail,
                    edges,
                    edge_triggers,
                    edge_conditions,
                    // Again, we don't want to set `last_subgraph_entering_edge_info` if we're
                    // entering a `@defer` (see above).
                    //
                    // PORT_NOTE: In the JS codebase, the information for the last subgraph-entering
                    // is set incorrectly, in that the index is off by one. We fix that bug here.
                    last_subgraph_entering_edge_info,
                    runtime_types_of_tail: Arc::new(self.graph.advance_possible_runtime_types(
                        &self.runtime_types_of_tail,
                        Some(new_edge),
                    )?),
                    // We know last edge is not a cast.
                    runtime_types_before_tail_if_last_is_cast: None,
                    defer_on_tail: defer,
                    matching_context_ids: self.matching_context_ids.clone(),
                    arguments_to_context_usages: self.arguments_to_context_usages.clone(),
                });
            }
        }

        let (new_edge_conditions, new_matching_context_ids, new_arguments_to_context_usages) =
            self.merge_edge_conditions_with_resolution(&condition_path_tree, &context_map);
        let last_arguments_to_context_usages = new_arguments_to_context_usages.last();

        if let Some(Some(last_arguments_to_context_usages)) = last_arguments_to_context_usages {
            // TODO: Perhaps it is better to explicitly cast this to `GraphPathTriggerRefMut` and
            // pull out the field from there.
            if let Some(field) = trigger.get_field_mut() {
                // We need to add the extra @fromContext arguments to the trigger, but its likely
                // pointing to a schema that doesn't have them, so we update the trigger to use the
                // appropriate subgraph schema and position first.
                let QueryGraphEdgeTransition::FieldCollection {
                    source,
                    field_definition_position,
                    ..
                } = &edge_weight.transition
                else {
                    bail!(
                        "Unexpectedly found field trigger for non-field edge {}",
                        edge_weight
                    );
                };
                field.schema = self.graph.schema_by_source(source)?.clone();
                field.field_position = field_definition_position.clone();

                // Now we can append the extra @fromContext arguments.
                let updated_field_arguments =
                    last_arguments_to_context_usages
                        .iter()
                        .map(|(argument_name, usage_entry)| {
                            Node::new(apollo_compiler::executable::Argument {
                                name: argument_name.clone(),
                                value: Node::new(Value::Variable(usage_entry.context_id.clone())),
                            })
                        });
                field.arguments = field
                    .arguments
                    .iter()
                    .cloned()
                    .chain(updated_field_arguments)
                    .collect();
            }
        }

        edges.push(edge);
        edge_triggers.push(Arc::new(trigger));
        if defer.is_none() && self.graph.is_cross_subgraph_edge(new_edge)? {
            last_subgraph_entering_edge_info = Some(SubgraphEnteringEdgeInfo {
                index: self.edges.len(),
                conditions_cost: condition_cost,
            });
        }
        Ok(GraphPath {
            graph: self.graph.clone(),
            head: self.head,
            tail: edge_tail,
            edges,
            edge_triggers,
            edge_conditions: new_edge_conditions,
            // Again, we don't want to set `last_subgraph_entering_edge_info` if we're entering a
            // `@defer` (see above).
            last_subgraph_entering_edge_info,
            runtime_types_of_tail: Arc::new(
                self.graph
                    .advance_possible_runtime_types(&self.runtime_types_of_tail, Some(new_edge))?,
            ),
            runtime_types_before_tail_if_last_is_cast: if matches!(
                edge_weight.transition,
                QueryGraphEdgeTransition::Downcast { .. }
            ) {
                Some(self.runtime_types_of_tail.clone())
            } else {
                None
            },
            // If there is no new `@defer` taking precedence, and the edge is downcast, then we
            // inherit the prior version. This is because we only try to re-enter subgraphs for
            // `@defer` on concrete fields, so as long as we add downcasts, we should remember that
            // we still need to try re-entering the subgraph.
            defer_on_tail: if defer.is_some() {
                defer
            } else if matches!(
                edge_weight.transition,
                QueryGraphEdgeTransition::Downcast { .. }
            ) {
                self.defer_on_tail.clone()
            } else {
                None
            },
            matching_context_ids: new_matching_context_ids,
            arguments_to_context_usages: new_arguments_to_context_usages,
        })
    }

    #[allow(clippy::type_complexity)]
    fn merge_edge_conditions_with_resolution(
        &self,
        condition_path_tree: &Option<Arc<OpPathTree>>,
        context_map: &Option<IndexMap<Name, ContextMapEntry>>,
    ) -> (
        Vec<Option<Arc<OpPathTree>>>,
        Vec<Option<IndexSet<Name>>>,
        Vec<Option<IndexMap<Name, ContextUsageEntry>>>,
    ) {
        let mut edge_conditions = self.edge_conditions.clone();
        let mut matching_context_ids = self.matching_context_ids.clone();
        let mut arguments_to_context_usages = self.arguments_to_context_usages.clone();

        edge_conditions.push(condition_path_tree.clone());
        matching_context_ids.push(None);
        if context_map.is_none() || context_map.as_ref().is_some_and(|m| m.is_empty()) {
            arguments_to_context_usages.push(None);
            (
                edge_conditions,
                matching_context_ids,
                arguments_to_context_usages,
            )
        } else {
            let mut new_arguments_to_context_usages = IndexMap::default();
            for (_, entry) in context_map.iter().flat_map(|map| map.iter()) {
                let idx = edge_conditions.len() - entry.levels_in_query_path - 1;

                if let Some(path_tree) = &entry.path_tree {
                    let merged_conditions = edge_conditions[idx]
                        .as_ref()
                        .map_or_else(|| path_tree.clone(), |condition| condition.merge(path_tree));
                    edge_conditions[idx] = Some(merged_conditions);
                }
                matching_context_ids[idx]
                    .get_or_insert_with(Default::default)
                    .insert(entry.context_id.clone());

                new_arguments_to_context_usages.insert(
                    entry.argument_name.clone(),
                    ContextUsageEntry {
                        context_id: entry.context_id.clone(),
                        relative_path: vec![
                            FetchDataPathElement::Parent;
                            entry.levels_in_data_path
                        ],
                        selection_set: entry.selection_set.clone(),
                        subgraph_argument_type: entry.argument_type.clone(),
                    },
                );
            }
            arguments_to_context_usages.push(Some(new_arguments_to_context_usages));
            (
                edge_conditions,
                matching_context_ids,
                arguments_to_context_usages,
            )
        }
    }

    pub(crate) fn head_node(&self) -> Result<&QueryGraphNode, FederationError> {
        self.graph.node_weight(self.head)
    }

    pub(crate) fn tail_node(&self) -> Result<&QueryGraphNode, FederationError> {
        self.graph.node_weight(self.tail)
    }

    pub(crate) fn iter(&self) -> impl Iterator<Item = GraphPathItem<'_, TTrigger, TEdge>> {
        debug_assert_eq!(self.edges.len(), self.edge_triggers.len());
        debug_assert_eq!(self.edges.len(), self.edge_conditions.len());
        debug_assert_eq!(self.edges.len(), self.matching_context_ids.len());
        debug_assert_eq!(self.edges.len(), self.arguments_to_context_usages.len());
        izip!(
            self.edges.iter().copied(),
            &self.edge_triggers,
            &self.edge_conditions,
            &self.matching_context_ids,
            &self.arguments_to_context_usages,
        )
        .map(
            |(edge, trigger, condition, matching_context_ids, arguments_to_context_usages)| {
                (
                    edge,
                    trigger,
                    condition,
                    matching_context_ids.as_ref(),
                    arguments_to_context_usages.as_ref(),
                )
            },
        )
    }

    pub(crate) fn next_edges(&self) -> Result<impl Iterator<Item = EdgeIndex>, FederationError> {
        let get_id = |edge_ref: EdgeReference<_>| edge_ref.id();

        if self.defer_on_tail.is_some() {
            // If the path enters a `@defer` (meaning that what comes after needs to be deferred),
            // then it's the one special case where we explicitly need to ask for edges to self,
            // as we will force the use of a `@key` edge (so we can send the non-deferred part
            // immediately) and we may have to resume the deferred part in the same subgraph than
            // the one in which we were (hence the need for edges to self).
            return Ok(Either::Left(
                self.graph
                    .out_edges_with_federation_self_edges(self.tail)
                    .into_iter()
                    .map(get_id),
            ));
        }

        // In theory, we could always return `self.graph.out_edges(self.tail)` here. But in
        // practice, `non_trivial_followup_edges` may give us a subset of those "out edges" that
        // avoids some of the edges that we know we don't need to check because they are guaranteed
        // to be inefficient after the last edge. Note that is purely an optimization (see
        // https://github.com/apollographql/federation/pull/1653 for more details).
        if let Some(last_edge) = self.edges.last()
            && let Some(last_edge) = (*last_edge).into()
        {
            let Some(non_trivial_followup_edges) =
                self.graph.non_trivial_followup_edges.get(&last_edge)
            else {
                return Err(FederationError::internal(format!(
                    "Unexpectedly missing entry for {last_edge} in non-trivial followup edges map",
                    last_edge = EdgeIndexDisplay::new(last_edge, &self.graph)
                )));
            };
            return Ok(Either::Right(non_trivial_followup_edges.iter().copied()));
        }

        Ok(Either::Left(
            self.graph.out_edges(self.tail).into_iter().map(get_id),
        ))
    }

    fn is_on_top_level_query_root(&self) -> Result<bool, FederationError> {
        let head_weight = self.graph.node_weight(self.head)?;
        if !matches!(head_weight.type_, QueryGraphNodeType::FederatedRootType(_)) {
            return Ok(false);
        }

        // We walk the path's edges and as soon as we take a field (or the node is not a federated
        // root or a subgraph root), we know we're not on the top-level query/mutation/subscription
        // root anymore. The reason we don't just check that size <= 1 is that we could have a
        // top-level `... on Query` inline fragment that doesn't actually change the type.
        for edge in &self.edges {
            let Some(edge) = (*edge).into() else {
                continue;
            };

            let edge_weight = self.graph.edge_weight(edge)?;
            if matches!(
                edge_weight.transition,
                QueryGraphEdgeTransition::FieldCollection { .. }
            ) {
                return Ok(false);
            }

            let (_, tail) = self.graph.edge_endpoints(edge)?;
            let tail_weight = self.graph.node_weight(tail)?;
            let QueryGraphNodeType::SchemaType(tail_type_pos) = &tail_weight.type_ else {
                return Err(FederationError::internal(
                    "Edge tail is unexpectedly a federated root",
                ));
            };

            let tail_schema = self.graph.schema_by_source(&tail_weight.source)?;
            let tail_schema_definition = &tail_schema.schema().schema_definition;
            if let Some(query_type_name) = &tail_schema_definition.query
                && tail_type_pos.type_name() == &query_type_name.name
            {
                continue;
            }
            if let Some(mutation_type_name) = &tail_schema_definition.mutation
                && tail_type_pos.type_name() == &mutation_type_name.name
            {
                continue;
            }
            if let Some(subscription_type_name) = &tail_schema_definition.subscription
                && tail_type_pos.type_name() == &subscription_type_name.name
            {
                continue;
            }
            return Ok(false);
        }
        Ok(true)
    }

    fn tail_is_interface_object(&self) -> Result<bool, FederationError> {
        let tail_weight = self.graph.node_weight(self.tail)?;

        let QueryGraphNodeType::SchemaType(OutputTypeDefinitionPosition::Object(
            tail_type_position,
        )) = &tail_weight.type_
        else {
            return Ok(false);
        };

        let subgraph_schema = self.graph.schema_by_source(&tail_weight.source)?;
        let federation_spec_definition =
            get_federation_spec_definition_from_subgraph(subgraph_schema)?;
        let Some(interface_object_directive_definition) =
            federation_spec_definition.interface_object_directive_definition(subgraph_schema)?
        else {
            return Ok(false);
        };
        let tail_type = tail_type_position.get(subgraph_schema.schema())?;
        Ok(tail_type
            .directives
            .iter()
            .any(|directive| directive.name == interface_object_directive_definition.name))
    }

    fn last_edge_is_interface_object_fake_down_cast(&self) -> Result<bool, FederationError> {
        let Some(last_edge) = self.edges.last() else {
            return Ok(false);
        };
        let Some(last_edge) = (*last_edge).into() else {
            return Ok(false);
        };
        let last_edge_weight = self.graph.edge_weight(last_edge)?;
        Ok(matches!(
            last_edge_weight.transition,
            QueryGraphEdgeTransition::InterfaceObjectFakeDownCast { .. }
        ))
    }

    // PORT_NOTE: In the JS codebase, this was named
    // `lastIsIntefaceObjectFakeDownCastAfterEnteringSubgraph`.
    fn last_edge_is_interface_object_fake_down_cast_after_entering_subgraph(
        &self,
    ) -> Result<bool, FederationError> {
        Ok(self.last_edge_is_interface_object_fake_down_cast()?
            && self
            .last_subgraph_entering_edge_info
            .as_ref()
            .map(|edge_info| edge_info.index)
            // `len - 1` is the last index (the fake down cast), so `len - 2` is the previous edge.
            == Some(self.edges.len() - 2))
    }

    /*
    #[cfg_attr(
        feature = "snapshot_tracing",
        tracing::instrument(skip_all, level = "trace", name = "GraphPath::can_satisfy_conditions")
    )]
    */
    fn can_satisfy_conditions(
        &self,
        edge: EdgeIndex,
        condition_resolver: &mut impl ConditionResolver,
        context: &OpGraphPathContext,
        excluded_destinations: &ExcludedDestinations,
        excluded_conditions: &ExcludedConditions,
    ) -> Result<ConditionResolution, FederationError> {
        let edge_weight = self.graph.edge_weight(edge)?;
        if edge_weight.conditions.is_none() && edge_weight.required_contexts.is_empty() {
            return Ok(ConditionResolution::no_conditions());
        }

        /* Resolve context conditions */

        let mut total_cost = 0.;
        let mut context_map: IndexMap<Name, ContextMapEntry> = IndexMap::default();

        if !edge_weight.required_contexts.is_empty() {
            let mut was_unsatisfied = false;
            for ctx in &edge_weight.required_contexts {
                let mut levels_in_data_path = 0;
                for (mut levels_in_query_path, (e, trigger)) in self
                    .edges
                    .iter()
                    .zip(self.edge_triggers.iter())
                    .rev()
                    .enumerate()
                {
                    let parent_type = trigger.get_field_parent_type();
                    levels_in_query_path += 1;
                    if parent_type.is_some() {
                        levels_in_data_path += 1;
                    }
                    let Some(e) = (*e).into() else { continue };
                    if was_unsatisfied || context_map.contains_key(&ctx.argument_name) {
                        continue;
                    }
                    // There's a context match if in the supergraph schema, the field's parent type
                    // is equal to or a subtype of one of the @context types.
                    let Some(parent_type) = parent_type else {
                        continue;
                    };
                    let supergraph_schema = self.graph.supergraph_schema()?;
                    let parent_type_in_supergraph: CompositeTypeDefinitionPosition =
                        supergraph_schema
                            .get_type(parent_type.type_name().clone())?
                            .try_into()?;
                    if !ctx.types_with_context_set.iter().fallible_any(|pos| {
                        if pos.type_name() == parent_type_in_supergraph.type_name() {
                            return Ok::<_, FederationError>(true);
                        }
                        match &parent_type_in_supergraph {
                            CompositeTypeDefinitionPosition::Object(parent_type_in_supergraph) => {
                                if parent_type_in_supergraph
                                    .get(supergraph_schema.schema())?
                                    .implements_interfaces
                                    .iter()
                                    .any(|item| &item.name == pos.type_name())
                                {
                                    return Ok(true);
                                }
                            }
                            CompositeTypeDefinitionPosition::Interface(
                                parent_type_in_supergraph,
                            ) => {
                                if parent_type_in_supergraph
                                    .get(supergraph_schema.schema())?
                                    .implements_interfaces
                                    .iter()
                                    .any(|item| &item.name == pos.type_name())
                                {
                                    return Ok(true);
                                }
                            }
                            _ => {}
                        }
                        let pos_in_supergraph: CompositeTypeDefinitionPosition = supergraph_schema
                            .get_type(pos.type_name().clone())?
                            .try_into()?;
                        if let CompositeTypeDefinitionPosition::Union(pos_in_supergraph) =
                            &pos_in_supergraph
                            && pos_in_supergraph
                                .get(supergraph_schema.schema())?
                                .members
                                .iter()
                                .any(|item| &item.name == parent_type_in_supergraph.type_name())
                        {
                            return Ok(true);
                        }
                        Ok(false)
                    })? {
                        continue;
                    }

                    // We have a match, so parse the selection set against the field's parent type
                    // in the supergraph schema.
                    let mut selection_set = parse_field_value_without_validation(
                        &supergraph_schema,
                        parent_type_in_supergraph.type_name().clone(),
                        &ctx.selection,
                    )?;

                    // In the current version of @context (v0.1), type conditions (if they appear at
                    // all) are only allowed at top-level, and are only allowed to reference object
                    // types. Due to duck-typing semantics, there may be type conditions that have
                    // an empty intersection in possible runtime types with their parent type, which
                    // means we need to remove those type conditions for the selection set to be
                    // considered valid GraphQL.
                    let possible_runtime_types =
                        supergraph_schema.possible_runtime_types(parent_type_in_supergraph)?;
                    selection_set.selection_set.selections = selection_set
                        .selection_set
                        .selections
                        .into_iter()
                        .map(|selection| {
                            let apollo_compiler::executable::Selection::InlineFragment(
                                inline_fragment,
                            ) = &selection
                            else {
                                return Ok::<_, FederationError>(Some(selection));
                            };
                            let Some(type_condition) = &inline_fragment.type_condition else {
                                return Ok(Some(selection));
                            };
                            let type_condition_pos: ObjectTypeDefinitionPosition =
                                supergraph_schema
                                    .get_type(type_condition.clone())?
                                    .try_into()?;
                            if possible_runtime_types.contains(&type_condition_pos) {
                                return Ok(Some(selection));
                            }
                            Ok(None)
                        })
                        .process_results(|r| r.flatten().collect())?;

                    // The field set should be valid now, so we'll validate and convert to our
                    // selection set representation.
                    let selection_set = validate_field_value(&supergraph_schema, selection_set)?;
                    let resolution = condition_resolver.resolve(
                        e,
                        context,
                        excluded_destinations,
                        excluded_conditions,
                        Some(&selection_set),
                    )?;
                    let context_id = self.graph.context_id_by_source_and_argument(
                        &ctx.subgraph_name,
                        &ctx.argument_coordinate,
                    )?;
                    match &resolution {
                        ConditionResolution::Satisfied {
                            cost, path_tree, ..
                        } => {
                            total_cost += cost;
                            let entry = ContextMapEntry {
                                levels_in_data_path,
                                levels_in_query_path,
                                path_tree: path_tree.clone(),
                                selection_set,
                                argument_name: ctx.argument_name.clone(),
                                argument_type: ctx.argument_type.clone(),
                                context_id: context_id.clone(),
                            };
                            context_map.insert(ctx.argument_name.clone(), entry);
                        }
                        ConditionResolution::Unsatisfied { .. } => was_unsatisfied = true,
                    }
                }
            }

            if edge_weight
                .required_contexts
                .iter()
                .any(|ctx| !context_map.contains_key(&ctx.argument_name))
            {
                debug!("@fromContext requires a context that is not set in graph path");
                return Ok(ConditionResolution::Unsatisfied {
                    reason: Some(UnsatisfiedConditionReason::NoSetContext),
                });
            }

            if was_unsatisfied {
                debug!("@fromContext selection set is unsatisfied");
                return Ok(ConditionResolution::Unsatisfied { reason: None });
            }

            // it's possible that we will need to create a new fetch group at this point, in which
            // case we'll need to collect the keys to jump back to this object as a precondition
            // for satisfying it.
            let (edge_head, _) = self.graph.edge_endpoints(edge)?;
            if self.graph.get_locally_satisfiable_key(edge_head)?.is_none() {
                debug!("Post-context conditions cannot be satisfied");
                return Ok(ConditionResolution::Unsatisfied {
                    reason: Some(UnsatisfiedConditionReason::NoPostRequireKey),
                });
            }

            if edge_weight.conditions.is_none() {
                return Ok(ConditionResolution::Satisfied {
                    cost: total_cost,
                    path_tree: None,
                    context_map: Some(context_map),
                });
            }
        }

        /* Resolve all other conditions */

        debug_span!("Checking conditions {conditions} on edge {edge_weight}");
        let mut resolution = condition_resolver.resolve(
            edge,
            context,
            excluded_destinations,
            excluded_conditions,
            None,
        )?;
        if matches!(resolution, ConditionResolution::Unsatisfied { .. }) {
            return Ok(ConditionResolution::Unsatisfied { reason: None });
        }
        if let Some(Some(last_edge)) = self.edges.last().map(|e| (*e).into())
            && matches!(
                edge_weight.transition,
                QueryGraphEdgeTransition::FieldCollection { .. }
            )
        {
            let last_edge_weight = self.graph.edge_weight(last_edge)?;
            if !matches!(
                last_edge_weight.transition,
                QueryGraphEdgeTransition::KeyResolution
            ) {
                let in_same_subgraph = if let ConditionResolution::Satisfied {
                    path_tree: Some(path_tree),
                    ..
                } = &resolution
                {
                    path_tree.is_all_in_same_subgraph()?
                } else {
                    true
                };
                if in_same_subgraph {
                    debug!("@requires conditions are satisfied, but validating post-require key.");
                    let (edge_head, _) = self.graph.edge_endpoints(edge)?;
                    if self.graph.get_locally_satisfiable_key(edge_head)?.is_none() {
                        debug!("Post-require conditions cannot be satisfied");
                        return Ok(ConditionResolution::Unsatisfied {
                            reason: Some(UnsatisfiedConditionReason::NoPostRequireKey),
                        });
                    };
                    // We're in a case where we have an `@requires` application (we have
                    // conditions and the new edge has a `FieldCollection` transition) and we
                    // have to jump to another subgraph to satisfy the `@requires`, which means
                    // we need to use a key on "the current subgraph" to resume collecting the
                    // field with the `@requires`. `get_locally_satisfiable_key()` essentially
                    // tells us that we have such key, and that's good enough here. Note that
                    // the way the code is organised, we don't use an actual edge of the query
                    // graph, so we cannot use `condition_resolver` and so it's not easy to get
                    // a proper cost or tree. That's ok in the sense that the cost of the key is
                    // negligible because we know it's a "local" one (there is no subgraph jump)
                    // and the code to build plan will deal with adding that key anyway (so not
                    // having the tree is ok).
                    // TODO(Sylvain): the whole handling of `@requires` is a bit too complex and
                    // hopefully we might be able to clean that up, but it's unclear to me how
                    // at the moment and it may not be a small change so this will have to do
                    // for now.
                }
            }
        }
        if let ConditionResolution::Satisfied {
            cost,
            context_map: ctx_map,
            ..
        } = &mut resolution
        {
            *cost += total_cost;
            *ctx_map = Some(context_map);
        }
        snapshot!(
            "ConditionResolution",
            resolution.to_string(),
            "Condition resolution"
        );
        Ok(resolution)
    }

    // TODO: We've skipped populating `Unadvanceables` information because it's only needed during
    // composition, but we'll need to port that code when we port composition.
    // PORT_NOTE: In the JS codebase, this was named
    // `advancePathWithNonCollectingAndTypePreservingTransitions`.
    #[cfg_attr(
        feature = "snapshot_tracing",
        tracing::instrument(skip_all, level = "trace")
    )]
    #[allow(clippy::too_many_arguments)]
    fn advance_with_non_collecting_and_type_preserving_transitions<TDeadEnds>(
        self: &Arc<Self>,
        context: &OpGraphPathContext,
        condition_resolver: &mut impl ConditionResolver,
        excluded_destinations: &ExcludedDestinations,
        excluded_conditions: &ExcludedConditions,
        override_conditions: &OverrideConditions,
        transition_and_context_to_trigger: impl Fn(
            &QueryGraphEdgeTransition,
            &OpGraphPathContext,
        ) -> TTrigger,
        node_and_trigger_to_edge: impl Fn(
            &Arc<QueryGraph>,
            NodeIndex,
            &Arc<TTrigger>,
            &OverrideConditions,
        ) -> Result<Option<TEdge>, FederationError>,
        disabled_subgraphs: &IndexSet<Arc<str>>,
    ) -> Result<IndirectPaths<TTrigger, TEdge, TDeadEnds>, FederationError>
    where
        UnadvanceableClosures: Into<TDeadEnds>,
    {
        // If we're asked for indirect paths after an "@interfaceObject fake down cast" but that
        // down cast comes just after non-collecting edge(s), then we can ignore the ask (skip
        // indirect paths from there). The reason is that the presence of the non-collecting edges
        // just before the fake down cast means we looked at indirect paths just before that
        // down cast, but that fake down cast really does nothing in practice with the subgraph it's
        // on, so any indirect path from that fake down cast will have a valid indirect path
        // before it, and so will have been taken into account independently.
        if self.last_edge_is_interface_object_fake_down_cast_after_entering_subgraph()? {
            let path = self.clone();
            return Ok(IndirectPaths {
                paths: Arc::new(vec![]),
                dead_ends: UnadvanceableClosures(Arc::new(vec![
                    Arc::new(LazyLock::new(Box::new(move || {
                        let path_tail_weight = path.graph.node_weight(path.tail)?;
                        let reachable_subgraphs = path.next_edges()?.map(|edge| {
                            let edge_weight = path.graph.edge_weight(edge)?;
                            let (_, edge_tail) = path.graph.edge_endpoints(edge)?;
                            let edge_tail_weight = path.graph.node_weight(edge_tail)?;
                            Ok::<_, FederationError>(
                                if !edge_weight.transition.collect_operation_elements()
                                    && edge_tail_weight.source != path_tail_weight.source
                                {
                                    Some(edge_tail_weight.source.clone())
                                } else {
                                    None
                                }
                            )
                        }).process_results(
                            |iter| iter.flatten().collect::<IndexSet<_>>()
                        )?;
                        Ok(Unadvanceables(Arc::new(reachable_subgraphs.into_iter().map(|subgraph| {
                            Arc::new(Unadvanceable {
                                reason: UnadvanceableReason::IgnoredIndirectPath,
                                from_subgraph: path_tail_weight.source.clone(),
                                to_subgraph: subgraph.clone(),
                                details: format!(
                                    "ignoring moving from \"{}\" to \"{}\" as a more direct option exists",
                                    path_tail_weight.source,
                                    subgraph,
                                ),
                            })
                        }).collect())))
                    })))
                ])).into(),
            });
        }

        let is_top_level_path = self.is_on_top_level_query_root()?;
        let tail_weight = self.graph.node_weight(self.tail)?;
        let tail_type_pos = if let QueryGraphNodeType::SchemaType(type_) = &tail_weight.type_ {
            Some(type_)
        } else {
            None
        };
        let original_source = tail_weight.source.clone();
        // For each source, we store the best path we find for that source with the score, or `None`
        // if we can decide that we should not try going to that source (typically because we can
        // prove that this create an inefficient detour for which a more direct path exists and will
        // be found).
        type BestPathInfo<TTrigger, TEdge> =
            Option<(Arc<GraphPath<TTrigger, TEdge>>, QueryPlanCost)>;
        let mut best_path_by_source: IndexMap<Arc<str>, BestPathInfo<TTrigger, TEdge>> =
            IndexMap::default();
        let mut dead_end_closures: Vec<UnadvanceableClosure> = vec![];
        // Note that through `excluded` we avoid taking the same edge from multiple options. But
        // that means it's important we try the smallest paths first. That is, if we could in theory
        // have path A -> B and A -> C -> B, and we can do B -> D, then we want to keep A -> B -> D,
        // not A -> C -> B -> D.
        let mut heap: MaxHeap<HeapElement<TTrigger, TEdge>> = MaxHeap::new();
        heap.push(HeapElement(self.clone()));

        while let Some(HeapElement(to_advance)) = heap.pop() {
            debug!("From {to_advance}");
            let span = debug_span!(" |");
            let _guard = span.enter();
            let mut does_non_collecting_next_edge_exist = false;
            for edge in to_advance.next_edges()? {
                let edge_weight = self.graph.edge_weight(edge)?;
                if edge_weight.transition.collect_operation_elements() {
                    continue;
                }
                does_non_collecting_next_edge_exist = true;
                debug!(
                    "Testing edge {edge}",
                    edge = EdgeIndexDisplay::new(edge, &self.graph)
                );
                let span = debug_span!(" |");
                let _guard = span.enter();
                let (edge_head, edge_tail) = self.graph.edge_endpoints(edge)?;
                let edge_tail_weight = self.graph.node_weight(edge_tail)?;

                if excluded_destinations.is_excluded(&edge_tail_weight.source) {
                    debug!("Ignored: edge is excluded");
                    continue;
                }

                if disabled_subgraphs.contains(&edge_tail_weight.source) {
                    debug!("Ignored: subgraph is disabled");
                    continue;
                }

                // If the edge takes us back to the subgraph in which we started, we're not really
                // interested (we've already checked for a direct transition from that original
                // subgraph). One exception though is if we're just after a @defer, in which case
                // re-entering the current subgraph is actually useful.
                if edge_tail_weight.source == original_source && to_advance.defer_on_tail.is_none()
                {
                    debug!("Ignored: edge get us back to our original source");
                    continue;
                }

                // We have edges between Query objects so that if a field returns a query object, we
                // can jump to any subgraph at that point. However, there is no point of using those
                // edges at the beginning of a path, except for when we have a @defer, in which case
                // we want to allow re-entering the same subgraph.
                if is_top_level_path
                    && matches!(
                        edge_weight.transition,
                        QueryGraphEdgeTransition::RootTypeResolution { .. }
                            | QueryGraphEdgeTransition::KeyResolution
                    )
                    && !(to_advance.defer_on_tail.is_some()
                        && self.graph.is_self_key_or_root_edge(edge)?)
                {
                    debug!(
                        r#"Ignored: edge is a top-level "RootTypeResolution" or "KeyResolution""#
                    );
                    continue;
                }

                let prev_for_source = best_path_by_source.get(&edge_tail_weight.source);
                let prev_for_source = match prev_for_source {
                    Some(Some(prev_for_source)) => Some(prev_for_source),
                    Some(None) => {
                        debug!(
                            "Ignored: we've shown before than going to {original_source:?} is not productive"
                        );
                        continue;
                    }
                    None => None,
                };

                if let Some(prev_for_source) = prev_for_source
                    && ((prev_for_source.0.edges.len() < to_advance.edges.len() + 1)
                        || (prev_for_source.0.edges.len() == to_advance.edges.len() + 1
                            && prev_for_source.1 <= 1.0))
                {
                    debug!("Ignored: a better (shorter) path to the same subgraph already added");
                    // We've already found another path that gets us to the same subgraph rather
                    // than the edge we're about to check. If that previous path is strictly
                    // shorter than the path we'd obtain with the new edge, then we don't
                    // consider this edge (it's a longer way to get to the same place). And if
                    // the previous path is the same size (as the one obtained with that edge),
                    // but that previous path's cost for getting the condition was 0 or 1, then
                    // the new edge cannot really improve on this and we don't bother with it.
                    //
                    // Note that a cost of 0 can only happen during composition validation where
                    // all costs are 0 to mean "we don't care about costs". This effectively
                    // means that for validation, as soon as we have a path to a subgraph, we
                    // ignore other options even if they may be "faster".
                    continue;
                }

                if excluded_conditions.is_excluded(edge_weight.conditions.as_ref()) {
                    debug!("Ignored: edge condition is excluded");
                    continue;
                }

                debug!("Validating conditions {edge_weight}");
                let span = debug_span!(" |");
                let guard = span.enter();
                // As we validate the condition for this edge, it might be necessary to jump to
                // another subgraph, but if for that we need to jump to the same subgraph we're
                // trying to get to, then it means there is another, shorter way to go to our
                // destination and we can return that shorter path, not the one with the edge
                // we're trying.
                let condition_resolution = to_advance.can_satisfy_conditions(
                    edge,
                    condition_resolver,
                    context,
                    &excluded_destinations.add_excluded(&edge_tail_weight.source),
                    excluded_conditions,
                )?;

                let ConditionResolution::Satisfied {
                    path_tree,
                    cost,
                    context_map,
                } = condition_resolution
                else {
                    drop(guard);
                    debug!("Condition unsatisfiable");
                    let to_advance = to_advance.clone();
                    dead_end_closures.push(Arc::new(LazyLock::new(Box::new(move || {
                        let to_advance_tail_weight = to_advance.graph.node_weight(to_advance.tail)?;
                        let edge_weight = to_advance.graph.edge_weight(edge)?;
                        let (edge_head, edge_tail) = to_advance.graph.edge_endpoints(edge)?;
                        let edge_head_weight = to_advance.graph.node_weight(edge_head)?;
                        let edge_tail_weight = to_advance.graph.node_weight(edge_tail)?;
                        let from_subgraph = &to_advance_tail_weight.source;
                        let to_subgraph = &edge_tail_weight.source;
                        let from_subgraph_schema = to_advance.graph.schema_by_source(from_subgraph)?;
                        let Some(conditions) = &edge_weight.conditions else {
                            bail!("Unsatisfiable conditions unexpectedly missing on edge");
                        };
                        let has_overridden_field = Self::condition_has_overridden_fields_in_subgraph(
                            from_subgraph_schema,
                            conditions,
                        )?;
                        let extra_message = if has_overridden_field {
                            format!(
                                " (note that some of those key fields are overridden in \"{from_subgraph}\")",
                            )
                        } else {
                            "".to_owned()
                        };
                        Ok(Unadvanceables(Arc::new(vec![Arc::new(Unadvanceable {
                            reason: UnadvanceableReason::UnsatisfiableKeyCondition,
                            from_subgraph: from_subgraph.clone(),
                            to_subgraph: to_subgraph.clone(),
                            details: format!(
                                "cannot move to subgraph \"{}\" using @key(fields: \"{}\") of \"{}\", the key field(s) cannot be resolved from subgraph \"{}\"{}",
                                to_subgraph,
                                FieldSetDisplay(conditions),
                                edge_head_weight.type_,
                                from_subgraph,
                                extra_message
                            ),
                        })])))
                    }))));
                    continue;
                };
                drop(guard);
                debug!("Condition satisfied");

                // We can get to `edge_tail_weight.source` with that edge. But if we had already
                // found another path to the same subgraph, we want to replace it with this one only
                // if either 1) it is shorter or 2) if it's of equal size, only if the condition
                // cost is lower than the previous one.
                if let Some(prev_for_source) = prev_for_source
                    && prev_for_source.0.edges.len() == to_advance.edges.len() + 1
                    && prev_for_source.1 <= cost
                {
                    debug!(
                        "Ignored: a better (less costly) path to the same subgraph already added"
                    );
                    continue;
                }

                // It's important we minimize the number of options this method returns, because
                // during query planning with many fields, options here translate to state
                // explosion. This is why above we eliminated edges that provably have better
                // options.
                //
                // But we can do a slightly more involved check. Suppose we have a few subgraphs A,
                // B and C, and suppose that we're considering an edge from B to C. We can then look
                // at which subgraph we were in before reaching B (which can be "none" if the query
                // starts at B), and let say that it is A. In other words, if we use the edge we're
                // considering, we'll be looking at a path like:
                //   ... -> A -> B -> <some fields in B> -> C
                //
                // Now, we can fairly easily check if the fields we collected in B (the `<some
                // fields in B>`) can be also collected directly (without keys, nor requires) from
                // A and if after that we could take an edge to C. If we can do all that, then we
                // know that the path we're considering is strictly less efficient than:
                //   ... -> A -> <same fields but in A> -> C
                //
                // Furthermore, since we've confirmed it's a valid path, it will be found by another
                // branch of the algorithm. In that case, we can ignore the edge to C, knowing a
                // better path exists. Doing this drastically reduces state explosion in a number of
                // cases.
                if let Some(last_subgraph_entering_edge_info) =
                    &to_advance.last_subgraph_entering_edge_info
                {
                    let Some(last_subgraph_entering_edge) =
                        to_advance.edges[last_subgraph_entering_edge_info.index].into()
                    else {
                        return Err(FederationError::internal(
                            "Subgraph-entering edge is unexpectedly absent",
                        ));
                    };

                    let (last_subgraph_entering_edge_head, last_subgraph_entering_edge_tail) =
                        self.graph.edge_endpoints(last_subgraph_entering_edge)?;
                    let last_subgraph_entering_edge_tail_weight =
                        self.graph.node_weight(last_subgraph_entering_edge_tail)?;
                    let QueryGraphNodeType::SchemaType(last_subgraph_entering_edge_tail_type_pos) =
                        &last_subgraph_entering_edge_tail_weight.type_
                    else {
                        return Err(FederationError::internal(
                            "Subgraph-entering edge tail is unexpectedly a federated root",
                        ));
                    };
                    if Some(last_subgraph_entering_edge_tail_type_pos) != tail_type_pos {
                        let last_subgraph_entering_edge_weight =
                            self.graph.edge_weight(last_subgraph_entering_edge)?;

                        // If the previous subgraph is an actual subgraph, the head of the last
                        // subgraph-entering edge would be where a direct path starts. If the
                        // previous subgraph is a federated root, we instead take the previous
                        // subgraph to be the destination subgraph of this edge, and that subgraph's
                        // root of the same root kind (if it exists) would be where a direct path
                        // starts.
                        let direct_path_start_node = if matches!(
                            last_subgraph_entering_edge_weight.transition,
                            QueryGraphEdgeTransition::SubgraphEnteringTransition
                        ) {
                            let root = to_advance.head;
                            let root_weight = self.graph.node_weight(root)?;
                            let QueryGraphNodeType::FederatedRootType(root_kind) =
                                &root_weight.type_
                            else {
                                return Err(FederationError::internal(
                                    "Encountered non-root path with a subgraph-entering transition",
                                ));
                            };
                            // Since mutation options need to originate from the same subgraph, we
                            // pretend we cannot find a root node  in another subgraph (effectively
                            // skipping the optimization).
                            if *root_kind == SchemaRootDefinitionKind::Mutation {
                                None
                            } else {
                                self.graph
                                    .root_kinds_to_nodes_by_source(&edge_tail_weight.source)?
                                    .get(root_kind)
                                    .copied()
                            }
                        } else {
                            Some(last_subgraph_entering_edge_head)
                        };

                        // If the previous subgraph is a federated root, as noted above we take the
                        // previous subgraph to instead be the destination subgraph of this edge, so
                        // we must manually indicate that here.
                        let is_edge_to_previous_subgraph = if matches!(
                            last_subgraph_entering_edge_weight.transition,
                            QueryGraphEdgeTransition::SubgraphEnteringTransition
                        ) {
                            true
                        } else {
                            let last_subgraph_entering_edge_head_weight =
                                self.graph.node_weight(last_subgraph_entering_edge_head)?;
                            last_subgraph_entering_edge_head_weight.source
                                == edge_tail_weight.source
                        };

                        let direct_path_end_node =
                            if let Some(direct_path_start_node) = direct_path_start_node {
                                let QueryGraphNodeType::SchemaType(edge_tail_type_pos) =
                                    &edge_tail_weight.type_
                                else {
                                    return Err(FederationError::internal(
                                        "Edge tail is unexpectedly a federated root",
                                    ));
                                };
                                to_advance.check_direct_path_from_node(
                                    last_subgraph_entering_edge_info.index + 1,
                                    direct_path_start_node,
                                    edge_tail_type_pos,
                                    &node_and_trigger_to_edge,
                                    override_conditions,
                                )?
                            } else {
                                None
                            };

                        if let Some(direct_path_end_node) = direct_path_end_node {
                            let direct_key_edge_max_cost = last_subgraph_entering_edge_info
                                .conditions_cost
                                + if is_edge_to_previous_subgraph {
                                    0.0
                                } else {
                                    cost
                                };
                            if is_edge_to_previous_subgraph
                                || self.graph.has_satisfiable_direct_key_edge(
                                    direct_path_end_node,
                                    &edge_tail_weight.source,
                                    condition_resolver,
                                    direct_key_edge_max_cost,
                                )?
                            {
                                debug!(
                                    "Ignored: edge correspond to a detour by subgraph {} from subgraph {:?}: ",
                                    edge_tail_weight.source,
                                    self.graph
                                        .node_weight(last_subgraph_entering_edge_head)?
                                        .source
                                );
                                debug!(
                                    "we have a direct path from {} to {} in {}.",
                                    self.graph
                                        .node_weight(last_subgraph_entering_edge_head)?
                                        .type_,
                                    edge_tail_weight.type_,
                                    self.graph
                                        .node_weight(last_subgraph_entering_edge_head)?
                                        .source
                                );
                                if !is_edge_to_previous_subgraph {
                                    debug!(
                                        "And, it can move to {} from there",
                                        edge_tail_weight.source
                                    );
                                }
                                // We just found that going to the previous subgraph is useless
                                // because there is a more direct path. But we additionally record
                                // that this previous subgraph should be avoided altogether because
                                // some other longer path could try to get back to that same source
                                // but defeat this specific check due to having taken another edge
                                // first (and thus the last subgraph-entering edge is different).
                                //
                                // What we mean here is that if `to_advance` path is
                                //   ... -> A -> B -> <some fields in B>
                                // and we just found that we don't want to keep
                                //   ... -> A -> B -> <some fields in B> -> A
                                // because we know
                                //   ... -> A -> <some fields in A>
                                // is possible directly, then we don't want this method to later add
                                //   ... -> A -> B -> <some fields in B> -> C -> A
                                // as that is equally not useful.
                                best_path_by_source.insert(edge_tail_weight.source.clone(), None);
                                // We also record a dead-end because this optimization might make us
                                // return no path at all, and having recorded no-dead ends would
                                // break an assertion in `advance_with_transition()` that assumes
                                // that if we have recorded no-dead end, that's because we have no
                                // key edges. But note that this "dead end" message shouldn't really
                                // ever reach users.
                                let to_advance = to_advance.clone();
                                dead_end_closures.push(Arc::new(LazyLock::new(Box::new(move || {
                                    let to_advance_tail_weight =
                                        to_advance.graph.node_weight(to_advance.tail)?;
                                    let edge_weight = to_advance.graph.edge_weight(edge)?;
                                    let (edge_head, edge_tail) =
                                        to_advance.graph.edge_endpoints(edge)?;
                                    let edge_head_weight = to_advance.graph.node_weight(edge_head)?;
                                    let edge_tail_weight = to_advance.graph.node_weight(edge_tail)?;
                                    let Some(conditions) = &edge_weight.conditions else {
                                        bail!("Conditions unexpectedly missing on edge");
                                    };
                                    Ok(Unadvanceables(Arc::new(vec![Arc::new(Unadvanceable {
                                        reason: UnadvanceableReason::IgnoredIndirectPath,
                                        from_subgraph: to_advance_tail_weight.source.clone(),
                                        to_subgraph: edge_tail_weight.source.clone(),
                                        details: format!(
                                            "ignoring moving to subgraph \"{}\" using @key(fields: \"{}\") of \"{}\" because there is a more direct path in {} that avoids {} altogether",
                                            edge_tail_weight.source,
                                            FieldSetDisplay(conditions),
                                            edge_head_weight.type_,
                                            edge_tail_weight.source,
                                            to_advance_tail_weight.source,
                                        ),
                                    })])))
                                }))));
                                continue;
                            }
                        }
                    }
                }

                let updated_path = Arc::new(to_advance.add(
                    transition_and_context_to_trigger(&edge_weight.transition, context),
                    edge.into(),
                    ConditionResolution::Satisfied {
                        cost,
                        path_tree,
                        context_map,
                    },
                    None,
                )?);
                best_path_by_source.insert(
                    edge_tail_weight.source.clone(),
                    Some((updated_path.clone(), cost)),
                );
                // It can be necessary to "chain" keys, because different subgraphs may have
                // different keys exposed, and so we when we took a key, we want to check if there
                // is a new key we can now use that takes us to other subgraphs. For other
                // non-collecting edges ('RootTypeResolution' and 'SubgraphEnteringTransition')
                // however, chaining never give us additional value.
                //
                // One exception is the case of self-edges (which stay on the same node), as those
                // will only be looked at just after a @defer to handle potentially re-entering the
                // same subgraph. When we take this, there's no point in looking for chaining since
                // we'll independently check the other edges already.
                if matches!(
                    edge_weight.transition,
                    QueryGraphEdgeTransition::KeyResolution
                ) {
                    let edge_head_weight = self.graph.node_weight(edge_head)?;
                    if edge_head_weight.source != edge_tail_weight.source {
                        heap.push(HeapElement(updated_path));
                    }
                }
            }
            if !does_non_collecting_next_edge_exist {
                // The subtlety here is that this may either mean that there are no non-collecting
                // edges from the tail of this path, or that there are some but that they're
                // considered "trivial" edges since `next_edges()` above may end up calling
                // `QueryGraph::non_trivial_followup_edges`. In the latter case, this means there is
                // a key we could use, but it get us back to the previous vertex in the path, which
                // is useless. But we distinguish that case to 1) make the error messaging more
                // helpful for debugging, and 2) much more importantly, to record a "dead-end" for
                // this path.
                let out_edges = to_advance
                    .graph
                    .out_edges(to_advance.tail)
                    .into_iter()
                    .filter(|edge_ref| !edge_ref.weight().transition.collect_operation_elements())
                    .map(|edge_ref| edge_ref.id())
                    .collect::<Vec<_>>();
                if out_edges.is_empty() {
                    debug!(
                        "Nothing to try for {}: it has no non-collecting outbound edges",
                        to_advance,
                    );
                } else {
                    debug!(
                        "Nothing to try for {}: it only has \"trivial\" non-collecting outbound edges",
                        to_advance,
                    );
                    let to_advance = to_advance.clone();
                    let original_source = original_source.clone();
                    dead_end_closures.push(Arc::new(LazyLock::new(Box::new(move || {
                        let to_advance_tail_weight = to_advance.graph.node_weight(to_advance.tail)?;
                        Ok(Unadvanceables(Arc::new(out_edges.into_iter().map(|edge| {
                            let edge_weight = to_advance.graph.edge_weight(edge)?;
                            let (edge_head, edge_tail) = to_advance.graph.edge_endpoints(edge)?;
                            let edge_head_weight = to_advance.graph.node_weight(edge_head)?;
                            let edge_tail_weight = to_advance.graph.node_weight(edge_tail)?;
                            Ok::<_, FederationError>(
                                if edge_tail_weight.source != to_advance_tail_weight.source
                                    && edge_tail_weight.source != original_source
                                {
                                    let Some(conditions) = &edge_weight.conditions else {
                                        bail!("@key field unexpectedly had no conditions");
                                    };
                                    Some(Arc::new(Unadvanceable {
                                        reason: UnadvanceableReason::IgnoredIndirectPath,
                                        from_subgraph: to_advance_tail_weight.source.clone(),
                                        to_subgraph: edge_tail_weight.source.clone(),
                                        details: format!(
                                            "ignoring moving to subgraph \"{}\" using @key(fields: \"{}\") of \"{}\" because there is a more direct path in {} that avoids {} altogether",
                                            edge_tail_weight.source,
                                            FieldSetDisplay(conditions),
                                            edge_head_weight.type_,
                                            edge_tail_weight.source,
                                            to_advance_tail_weight.source,
                                        )
                                    }))
                                } else {
                                    None
                                }
                            )
                        }).process_results(|iter| iter.flatten().collect())?)))
                    }))));
                }
            }
        }

        Ok(IndirectPaths {
            paths: Arc::new(
                best_path_by_source
                    .into_values()
                    .flatten()
                    .map(|p| p.0)
                    .collect(),
            ),
            dead_ends: UnadvanceableClosures(Arc::new(dead_end_closures)).into(),
        })
    }

    // PORT_NOTE: Named `conditionHasOverriddenFieldsInSource()` in the JS codebase, but we've
    // changed source to subgraph here to clarify that we don't need to handle the federated root
    // source.
    fn condition_has_overridden_fields_in_subgraph(
        subgraph_schema: &ValidFederationSchema,
        conditions: &SelectionSet,
    ) -> Result<bool, FederationError> {
        let Some(metadata) = subgraph_schema.subgraph_metadata() else {
            bail!("Selection set should originate from a federation subgraph schema");
        };
        let external_directive_definition_name = &metadata
            .federation_spec_definition()
            .external_directive_definition(subgraph_schema)?
            .name;
        let mut stack = vec![conditions];
        while let Some(selection_set) = stack.pop() {
            for selection in selection_set.selections.values() {
                if let Some(sub_selection_set) = selection.selection_set() {
                    stack.push(sub_selection_set);
                }
                if let Selection::Field(selection) = selection {
                    let field_pos = selection.field.field_position.clone();
                    // The subtlety here is that the definition of the fields in the conditions are
                    // not the one of the subgraph we care about here in general, because the
                    // conditions on key edges are those of the destination of the edge, and here
                    // we want to check if the field is overridden in the source of the edge. Hence,
                    // we get the matching definition in the input schema.
                    let Ok(type_pos_in_subgraph) =
                        subgraph_schema.get_type(field_pos.type_name().clone())
                    else {
                        continue;
                    };
                    let Ok(type_pos_in_subgraph) =
                        ObjectTypeDefinitionPosition::try_from(type_pos_in_subgraph)
                    else {
                        continue;
                    };
                    let field_pos_in_subgraph =
                        type_pos_in_subgraph.field(field_pos.field_name().clone());
                    let Ok(field_in_subgraph) = field_pos_in_subgraph.get(subgraph_schema.schema())
                    else {
                        continue;
                    };
                    let Some(application) = field_in_subgraph
                        .directives
                        .get(external_directive_definition_name)
                    else {
                        continue;
                    };
                    let arguments = metadata
                        .federation_spec_definition()
                        .external_directive_arguments(application)?;
                    let Some(reason) = arguments.reason else {
                        continue;
                    };
                    if reason == "[overridden]" {
                        return Ok(true);
                    }
                }
            }
        }
        Ok(false)
    }

    /// Checks whether the partial path starting at the given edge index has an alternative path
    /// starting from the given node, where only direct edges are considered, and returns the node
    /// such a path ends on if it exists. Additionally, this method checks that the ending node has
    /// the given type.
    // PORT_NOTE: In the JS codebase, this was named `checkDirectPathFromPreviousSubgraphTo`. We've
    // also generalized this method a bit by shifting certain logic into the caller.
    fn check_direct_path_from_node(
        &self,
        start_index: usize,
        start_node: NodeIndex,
        end_type_position: &OutputTypeDefinitionPosition,
        node_and_trigger_to_edge: impl Fn(
            &Arc<QueryGraph>,
            NodeIndex,
            &Arc<TTrigger>,
            &OverrideConditions,
        ) -> Result<Option<TEdge>, FederationError>,
        override_conditions: &OverrideConditions,
    ) -> Result<Option<NodeIndex>, FederationError> {
        // TODO: Temporary fix to avoid optimization if context exists, a permanent fix is here:
        //       https://github.com/apollographql/federation/pull/3017#pullrequestreview-2083949094
        if self.graph.is_context_used() {
            return Ok(None);
        }

        let mut current_node = start_node;
        for index in start_index..self.edges.len() {
            let trigger = &self.edge_triggers[index];
            let Some(edge) =
                node_and_trigger_to_edge(&self.graph, current_node, trigger, override_conditions)?
            else {
                return Ok(None);
            };

            // If the edge is `None`, this means the trigger doesn't require taking an edge (it's
            // typically an inline fragment with no type condition, just directives), which we can
            // always match.
            let Some(edge) = edge.into() else {
                continue;
            };

            // If the edge has conditions, we don't consider it a direct path as we don't know if
            // that condition can be satisfied and at what cost.
            let edge_weight = self.graph.edge_weight(edge)?;
            if edge_weight.conditions.is_some() {
                return Ok(None);
            }

            current_node = self.graph.edge_endpoints(edge)?.1;
        }

        // If we got here, that means we were able to match all the triggers on the partial path,
        // and so assuming we're on the proper type, we have a direct path from the start node.
        let current_node_weight = self.graph.node_weight(current_node)?;
        let QueryGraphNodeType::SchemaType(type_pos) = &current_node_weight.type_ else {
            return Ok(None);
        };
        Ok(if type_pos == end_type_position {
            Some(current_node)
        } else {
            None
        })
    }
}

// Query graph accessors
// - `self` is used to access the underlying query graph, not its path data.
// - These methods will be useful in other modules of the crate.
impl<TTrigger, TEdge> GraphPath<TTrigger, TEdge>
where
    TTrigger: GraphPathTriggerVariant + Display,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
    pub(crate) fn schema_by_source(
        &self,
        source: &str,
    ) -> Result<&ValidFederationSchema, FederationError> {
        self.graph.schema_by_source(source)
    }

    pub(crate) fn edge_weight(
        &self,
        edge_index: EdgeIndex,
    ) -> Result<&QueryGraphEdge, FederationError> {
        self.graph.edge_weight(edge_index)
    }
}

/// `BinaryHeap::pop` returns the "greatest" element. We want the one with the fewest edges.
/// This wrapper compares by *reverse* comparison of edge count.
struct HeapElement<TTrigger, TEdge>(Arc<GraphPath<TTrigger, TEdge>>)
where
    TTrigger: Eq + Hash + GraphPathTriggerVariant,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>;

impl<TTrigger, TEdge> PartialEq for HeapElement<TTrigger, TEdge>
where
    TTrigger: Eq + Hash + GraphPathTriggerVariant,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
    fn eq(&self, other: &HeapElement<TTrigger, TEdge>) -> bool {
        self.0.edges.len() == other.0.edges.len()
    }
}

impl<TTrigger, TEdge> Eq for HeapElement<TTrigger, TEdge>
where
    TTrigger: Eq + Hash + GraphPathTriggerVariant,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
}

impl<TTrigger, TEdge> PartialOrd for HeapElement<TTrigger, TEdge>
where
    TTrigger: Eq + Hash + GraphPathTriggerVariant,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl<TTrigger, TEdge> Ord for HeapElement<TTrigger, TEdge>
where
    TTrigger: Eq + Hash + GraphPathTriggerVariant,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
    fn cmp(&self, other: &Self) -> Ordering {
        self.0.edges.len().cmp(&other.0.edges.len()).reverse()
    }
}

struct EdgeIndexDisplay<'graph, TEdge>
where
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
    _phantom_data_for_edge: PhantomData<TEdge>,
    graph: &'graph Arc<QueryGraph>,
    edge_index: EdgeIndex,
}

impl<'graph, TEdge> EdgeIndexDisplay<'graph, TEdge>
where
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
    fn new(edge_index: EdgeIndex, graph: &'graph Arc<QueryGraph>) -> Self {
        Self {
            _phantom_data_for_edge: Default::default(),
            graph,
            edge_index,
        }
    }
}

impl<TEdge> Display for EdgeIndexDisplay<'_, TEdge>
where
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        let e = self.edge_index;
        let graph = self.graph.graph();
        let (head_idx, tail_idx) = graph.edge_endpoints(e).ok_or(std::fmt::Error)?;
        let head = &graph[head_idx];
        let tail = &graph[tail_idx];
        let edge = &graph[e];
        let transition = &edge.transition;
        write!(f, "{head} -> {tail} ({transition})")
    }
}

impl<TTrigger, TEdge> Display for GraphPath<TTrigger, TEdge>
where
    TTrigger: Eq + Hash + Display + GraphPathTriggerVariant,
    TEdge: Copy + Into<Option<EdgeIndex>>,
    EdgeIndex: Into<TEdge>,
{
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        // If the path is length is 0 return "[]"
        // Traverse the path, getting the of the edge.
        let graph = self.graph.graph();
        let head = &graph[self.head];
        let head_is_root_node = head.is_root_node();
        if head_is_root_node && self.edges.is_empty() {
            return write!(f, "_");
        }
        if !head_is_root_node {
            write!(f, "{head}")?;
        }
        self.edges
            .iter()
            .cloned()
            .enumerate()
            .try_for_each(|(i, e)| match e.into() {
                Some(e) => {
                    let tail = graph.edge_endpoints(e).ok_or(std::fmt::Error)?.1;
                    let node = &graph[tail];
                    if i == 0 && head_is_root_node {
                        write!(f, "{node}")
                    } else {
                        let edge = &graph[e];
                        let label = edge.to_string();

                        if label.is_empty() {
                            write!(f, " --> {node}")
                        } else {
                            write!(f, " --[{label}]--> {node}")
                        }
                    }
                }
                None => write!(f, " ({}) ", self.edge_triggers[i].as_ref()),
            })?;
        if let Some(label) = self.defer_on_tail.as_ref().and_then(|d| d.label.as_ref()) {
            write!(f, "<defer='{label}'>")?;
        }
        if !self.runtime_types_of_tail.is_empty() {
            write!(f, " (types: [")?;
            let mut iter = self.runtime_types_of_tail.iter();
            if let Some(ty) = iter.next() {
                // First item
                write!(f, "{ty}")?;
                // The rest
                for ty in iter {
                    write!(f, " {ty}")?;
                }
            }
            write!(f, "])")?;
        }
        Ok(())
    }
}
